package demo;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
class MyRes {
    private static boolean FLAG = true;
    private final AtomicInteger atomicInteger = new AtomicInteger();

    private final BlockingQueue<String> blockingQueue;

    public MyRes(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
        log.info(blockingQueue.getClass().getName());
    }

    public void prod() throws Exception {
        String data;
        boolean isOk;
        while (FLAG) {
            data = atomicInteger.incrementAndGet() + "";
            isOk = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
            if (isOk) {
                log.info("{} 插入队列 {} 成功", Thread.currentThread().getName(),data);
            }else {
                log.info("{} 插入队列 {} 失败", Thread.currentThread().getName(),data);
            }
            TimeUnit.SECONDS.sleep(1);
        }
        log.info("{} 生成叫停了...",Thread.currentThread().getName());
    }

    public void consumer() throws InterruptedException {
        String data;
        while (FLAG) {
            data = blockingQueue.poll(2, TimeUnit.SECONDS);
            if (data == null) {
                FLAG = false;
                log.info("{} 超过两秒没有取到数据，消费退出", Thread.currentThread().getName());
                return;
            }
            log.info("{} 消费 {} 成功", Thread.currentThread().getName(),data);
        }
    }

    public void stop(){
        FLAG = false;
    }

}


/**
 * @author Javen
 */
@Slf4j
public class BlockingQueueDemo {

    /**
     * 不存元素的阻塞队列，即单元素队列
     */
    public static void synchronousQueueDemo() {
        BlockingQueue<String> synchronousQueue = new SynchronousQueue<>();
        new Thread(() ->
        {
            try {
                synchronousQueue.put("a");
                log.info("{} put a", Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            try {
                synchronousQueue.put("b");
                log.info("{} put b", Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            try {
                synchronousQueue.put("c");
                log.info("{} put c", Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "A").start();

        new Thread(() ->
        {
            try {
                TimeUnit.SECONDS.sleep(5);
                log.info("{} {}", Thread.currentThread().getName(), synchronousQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            try {
                TimeUnit.SECONDS.sleep(5);
                log.info("{} {}", Thread.currentThread().getName(), synchronousQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            try {
                TimeUnit.SECONDS.sleep(5);
                log.info("{} {}", Thread.currentThread().getName(), synchronousQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "B").start();
    }

    public static void main(String[] args) throws InterruptedException {
//        synchronousQueueDemo();

        MyRes res = new MyRes(new ArrayBlockingQueue<>(20));

        new Thread(() -> {
            log.info("{} 生成者启动",Thread.currentThread().getName());
            try {
                res.prod();
            } catch (Exception e) {
                e.printStackTrace();
            }
        },"prod").start();

        new Thread(() -> {
            log.info("{} 消费者启动",Thread.currentThread().getName());
            try {
                res.consumer();
            } catch (Exception e) {
                e.printStackTrace();
            }
        },"consumer").start();

        TimeUnit.SECONDS.sleep(5);
        log.info("5秒时间到，老板叫停....");
        res.stop();
    }
}
